12. Look Away: The Code

Look Away: The Code

Below is the complete code for look_away , followed by a step-by-step explanation of what is happening. You can copy and paste this code into the look_away script you created in the directory:

~/catkin_ws/src/simple_arm/scripts

The look_away code

#!/usr/bin/env python

import math
import rospy
from sensor_msgs.msg import Image, JointState
from simple_arm.srv import *


class LookAway(object):
    def __init__(self):
        rospy.init_node('look_away')

        self.sub1 = rospy.Subscriber('/simple_arm/joint_states', 
                                     JointState, self.joint_states_callback)
        self.sub2 = rospy.Subscriber("rgb_camera/image_raw", 
                                     Image, self.look_away_callback)
        self.safe_move = rospy.ServiceProxy('/arm_mover/safe_move', 
                                     GoToPosition)

        self.last_position = None
        self.arm_moving = False

        rospy.spin()

    def uniform_image(self, image):
        return all(value == image[0] for value in image)

    def coord_equal(self, coord_1, coord_2):
        if coord_1 is None or coord_2 is None:
            return False
        tolerance = .0005
        result = abs(coord_1[0] - coord_2[0]) <= abs(tolerance)
        result = result and abs(coord_1[1] - coord_2[1]) <= abs(tolerance)
        return result

    def joint_states_callback(self, data):
        if self.coord_equal(data.position, self.last_position):
            self.arm_moving = False
        else:
            self.last_position = data.position
            self.arm_moving = True

    def look_away_callback(self, data):
        if not self.arm_moving and self.uniform_image(data.data):
            try:
                rospy.wait_for_service('/arm_mover/safe_move')
                msg = GoToPositionRequest()
                msg.joint_1 = 1.57
                msg.joint_2 = 1.57
                response = self.safe_move(msg)

                rospy.logwarn("Camera detecting uniform image. \
                               Elapsed time to look at something nicer:\n%s", 
                               response)

            except rospy.ServiceException, e:
                rospy.logwarn("Service call failed: %s", e)



if __name__ == '__main__':
    try: 
        LookAway()
    except rospy.ROSInterruptException:
        pass

Look Away: The Code

The code: explained

Import statements

#!/usr/bin/env python

import math
import rospy
from sensor_msgs.msg import Image, JointState
from simple_arm.srv import *

The imported modules are similar to those in simple_arm , except this time, we have the Image message type being imported so that the camera data can be used.

The LookAway Class and __init__ method

class LookAway(object):
    def __init__(self):
        rospy.init_node('look_away')

        self.sub1 = rospy.Subscriber('/simple_arm/joint_states', 
                                     JointState, self.joint_states_callback)
        self.sub2 = rospy.Subscriber("rgb_camera/image_raw", 
                                     Image, self.look_away_callback)
        self.safe_move = rospy.ServiceProxy('/arm_mover/safe_move', 
                                     GoToPosition)

        self.last_position = None
        self.arm_moving = False

        rospy.spin()

We define a class for this node to better keep track of the robot arm's current movement state and position history. Just as in the node definitions before, the node is initialized using ropsy.init_node , and at the end of the method rospy.spin() is used to block until a shutdown request is received by the node.

The first subscriber, self.sub1 , subscribes to the /simple_arm/joint_states topic. The node is written to check the camera only when the arm is not moving, and by subscribing to /simple_arm/joint_states , changes in the position of the arm can be tracked. The message type for this topic is JointState , and with each message, the message data is passed to the joint_states_callback function.

The second subscriber, self.sub2 , subscribes to the /rgb_camera/image_raw topic. The message type here is Image , and with each message, the look_away_callback function is called.

A ServiceProxy is how rospy enables calling a service from a node. The ServiceProxy here is created using the name of the service you wish to call along with the service class definition: in this case /arm_mover/safe_move and GoToPosition . The actual calls to the service will take place in the look_away_callback method below.

The helper methods

    def uniform_image(self, image):
        return all(value == image[0] for value in image)

    def coord_equal(self, coord_1, coord_2):
        if coord_1 is None or coord_2 is None:
            return False
        tolerance = .0005
        result = abs(coord_1[0] - coord_2[0]) <= abs(tolerance)
        result = result and abs(coord_1[1] - coord_2[1]) <= abs(tolerance)
        return result

There are two helper methods defined in the code: uniform_image and coord_equal . The uniform_image method takes an image as input and checks if all color values in the image are the same as the value of the first pixel. This essentially checks that all the color values in the image are the same.

The coord_equal method returns True if the coordinates coord_1 and coord_2 have equal components up to the specified tolerance.

The callback functions

    def joint_states_callback(self, data):
        if self.coord_equal(data.position, self.last_position):
            self.arm_moving = False
        else:
            self.last_position = data.position
            self.arm_moving = True

    def look_away_callback(self, data):
        if not self.arm_moving and self.uniform_image(data.data):
            try:
                rospy.wait_for_service('/arm_mover/safe_move')
                msg = GoToPositionRequest()
                msg.joint_1 = 1.57
                msg.joint_2 = 1.57
                response = self.safe_move(msg)

                rospy.logwarn("Camera detecting uniform image. \
                               Elapsed time to look at something nicer:\n%s", 
                               response)

            except rospy.ServiceException, e:
                rospy.logwarn("Service call failed: %s", e)

When self.sub1 receives a message on /simple_arm/joint_states topic, the message is passed to the joint_states_callback in the variable data . The joint_states_callback uses the coord_equal helper method to check if the current joint states provided in data are the same as the previous joint states, which are stored in self.last_position . If the current and previous joint states are the same (up to the specified tolerance), then the the arm has stopped moving, so the self.arm_moving flag is set to False . If the current and previous joint states are different, then the arm is still moving. In this case, the method updates self.last_position with current position data and sets self.arm_moving to True .

The look_away_callback is receiving data from the /rgb_camera/image_raw topic. The first line of this method verifies that the arm is not moving and also checks if the the image is uniform. If the arm isn't moving and the image is uniform, then a GoToPositionRequest() message is created and sent using the safe_move service, moving both joint angles to 1.57 . The method also logs a message warning you that the camera has detected a uniform image along with the elapsed time to return to a nicer image.